package drds.plus.executor;

import drds.plus.common.lifecycle.AbstractLifecycle;
import drds.plus.common.properties.ConnectionProperties;
import drds.plus.common.utils.TStringUtil;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor.impl.result_cursor.EmptyResultCursor;
import drds.plus.executor.cursor.cursor.impl.result_cursor.ResultCursor;
import drds.plus.executor.data_node_executor.DataNodeExecutorContext;
import drds.plus.executor.function.scalar.ScalarFunction;
import drds.plus.executor.utils.AtomicNumberCreator;
import drds.plus.executor.utils.Utils;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExplainMode;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.Item;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.Function;
import drds.plus.sql_process.abstract_syntax_tree.node.Node;
import drds.plus.sql_process.abstract_syntax_tree.node.query.Query;
import drds.plus.sql_process.optimizer.IOptimizer;
import drds.plus.sql_process.optimizer.OptimizerContext;
import drds.plus.sql_process.optimizer.chooser.EmptyResultFilterException;
import drds.plus.util.ExtraCmd;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;

@Slf4j
public class Executor extends AbstractLifecycle implements IExecutor {

    public Logger log() {
        return log;
    }

    private static final String EXPLAIN = "explain";
    /**
     * atomicNumberCreator 生成器
     */
    private final AtomicNumberCreator atomicNumberCreator = AtomicNumberCreator.getNewInstance();

    /**
     * 核心流程 解析 优化 执行
     */
    public ResultCursor execute(ExecuteContext executeContext, String sql) throws RuntimeException {

        List list = null;
        try {
            ExplainHint explainHint = procesExplain(sql);
            if (explainHint != null) {
                sql = sql.substring(explainHint.explainIndex);
            }

            ExecutePlan executePlan = parseAndOptimize(executeContext, sql);

            if (executePlan.isExistSequenceValue()) {
                executeContext.getSession().setLastInsertId(executePlan.getLastSequenceValue());
            }

            if (ExecutePlan.USE_LAST_DATA_NODE.equals(executePlan.getDataNodeId())) {
                executePlan.setDataNodeId(ExecutePlan.DUAL_GROUP);
            }

            if (explainHint != null) {
                executePlan.setExplainMode(explainHint.explainMode); // 设置为explain标记
            }

            return execute(executeContext, executePlan);
        } catch (EmptyResultFilterException e) {
            if (e.getNode() instanceof Query) {
                list = ((Query) e.getNode()).getSelectItemList();

                if (((Query) e.getNode()).getAlias() != null) {
                    list = Utils.copyItemList(list);
                    for (Object s : list) {
                        ((Item) s).setTableName(((Query) e.getNode()).getAlias());
                    }
                }
            }
            return new EmptyResultCursor(executeContext, list);
        } catch (Exception e) {
            if (e instanceof EmptyResultFilterException) {
                EmptyResultFilterException ex = (EmptyResultFilterException) e;
                if (ex.getNode() instanceof Query) {
                    list = ((Query) ex.getNode()).getSelectItemList();

                    if (((Query) ex.getNode()).getAlias() != null) {
                        list = Utils.copyItemList(list);
                        for (Object s : list) {
                            ((Item) s).setTableName(((Query) ex.getNode()).getAlias());
                        }
                    }
                }

                return new EmptyResultCursor(executeContext, list);
            } else {
                throw new RuntimeException(e);
            }
        }
    }

    private ExplainHint procesExplain(String sql) {
        String temp = sql;
        int i = 0;
        boolean explain = false;
        for (; i < temp.length(); ++i) {
            switch (temp.charAt(i)) {
                case ' ':
                case '\t':
                case '\r':
                case '\n':
                    continue;
            }
            if (explain) { // 跳到下一个非空字符
                break;
            }

            if (TStringUtil.startsWithIgnoreCase(temp, i, EXPLAIN)) {
                i = i + EXPLAIN.length();
                explain = true;
            }
        }

        if (explain) {
            ExplainHint explainHint = new ExplainHint();
            for (ExplainMode explainMode : ExplainMode.values()) {
                if (TStringUtil.startsWithIgnoreCase(temp, i, explainMode.name())) {
                    explainHint.explainIndex = explainMode.name().length() + i;
                    explainHint.explainMode = explainMode;
                    return explainHint;
                }
            }

            explainHint.explainIndex = i;
            explainHint.explainMode = ExplainMode.SIMPLE;
            return explainHint;// 默认为simple模式
        } else {
            return null;
        }
    }

    public ExecutePlan parseAndOptimize(ExecuteContext executeContext, String sql) throws RuntimeException {
        boolean cache = ExtraCmd.getExtraCmdBoolean(executeContext.getExtraCmds(), ConnectionProperties.OPTIMIZER_CACHE, true);

        IOptimizer optimizer = OptimizerContext.getOptimizerContext().getOptimizer();

        Object object = optimizer.optimizeHintOrNode(sql, executeContext.getParameters(), cache, executeContext.getExtraCmds());

        if (object instanceof ExecutePlan) {
            return (ExecutePlan) object;
        }
        Node node = (Node) object;
        boolean existSubQuery = false;
        Function nextSubQueryFilter = node.getNextSubQueryFunction();
        while (true) {
            if (nextSubQueryFilter == null) {
                break;
            }
            existSubQuery = true;
            Query query = (Query) nextSubQueryFilter.getArgList().get(0);
            ExecutePlan executePlan = optimizer.optimizeNodeAndToExecutePlanAndOptimizeExecutePlan(query, executeContext.getParameters(), executeContext.getExtraCmds());
            nextSubQueryFilter.getArgList().set(0, executePlan);
            Object subQueryResult = ((ScalarFunction) nextSubQueryFilter.getExtraFunction()).scalarCalucate(executeContext, null);
            //
            Map<Long, Object> subQueryCorrelateFilterIdToValueMap = new HashMap<Long, Object>();
            subQueryCorrelateFilterIdToValueMap.put(query.getSubQueryId(), subQueryResult);
            nextSubQueryFilter = optimizer.subQueryAssignValueAndReturnNextSubQueryFunction(node, subQueryCorrelateFilterIdToValueMap, executeContext.getExtraCmds());
        }
        if (existSubQuery) {// 如果存在子查询,替换子查询数据后需要重新构建下语法树,子查询会移动为subqueryFilter
            node = optimizer.optimizeNode(node, executeContext.getParameters(), executeContext.getExtraCmds());
        }
        ExecutePlan executePlan = optimizer.optimizeNodeAndToExecutePlanAndOptimizeExecutePlan(node, executeContext.getParameters(), executeContext.getExtraCmds());
        return executePlan;

    }

    public ResultCursor execute(ExecuteContext executeContext, ExecutePlan executePlan) throws RuntimeException {
        List list = null;
        // client端核心流程y
        try {
            ISortingCursor cursor = DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().execute(executeContext, executePlan);
            ResultCursor resultCursor = this.wrapResultCursor(executeContext, executePlan, cursor);
            if (!executePlan.isExplain() && executePlan instanceof drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Query) {
                // 做下表名替换
                list = ((drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Query) executePlan).getItemList();
                if (((drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Query) executePlan).getAlias() != null) {
                    list = Utils.copyItemList(list);
                    for (Object s : list) {
                        ((Item) s).setTableName(((drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Query) executePlan).getAlias());
                    }
                }
                resultCursor.setOriginalSelectColumns(list);
            }
            return resultCursor;
        } catch (EmptyResultFilterException e) {
            return new EmptyResultCursor(executeContext, list);
        } catch (Exception e) {
            if (e instanceof EmptyResultFilterException) {
                return new EmptyResultCursor(executeContext, list);
            } else {
                throw new RuntimeException(e);
            }
        }
    }

    public Future<List<ISortingCursor>> executeWithFuture(ExecuteContext executeContext, List<ExecutePlan> executePlanList) throws RuntimeException {
        return DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().executeWithFuture(executeContext, executePlanList);
    }

    public Future<ISortingCursor> executeWithFuture(ExecuteContext executeContext, ExecutePlan executePlan) throws RuntimeException {
        return DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().executeWithFuture(executeContext, executePlan);
    }

    public ResultCursor commit(ExecuteContext executeContext) throws RuntimeException {
        return DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().commit(executeContext);
    }

    public ResultCursor rollback(ExecuteContext executeContext) throws RuntimeException {
        return DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().rollback(executeContext);

    }

    public Future<ResultCursor> commitWithFuture(ExecuteContext executeContext) throws RuntimeException {
        return DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().commitWithFuture(executeContext);
    }

    public Future<ResultCursor> rollbackWithFuture(ExecuteContext executeContext) throws RuntimeException {
        return DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().rollbackWithFuture(executeContext);
    }

    protected ResultCursor wrapResultCursor(ExecuteContext executeContext, ExecutePlan executePlan, ISortingCursor iSortingCursor) throws RuntimeException {
        ResultCursor resultCursor;
        // 包装为可以传输的ResultCursor
        if (executePlan instanceof drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Query) {
            if (!(iSortingCursor instanceof ResultCursor)) {
                resultCursor = new ResultCursor(iSortingCursor, executeContext);
            } else {
                resultCursor = (ResultCursor) iSortingCursor;
            }

        } else {
            if (!(iSortingCursor instanceof ResultCursor)) {
                resultCursor = new ResultCursor(iSortingCursor, executeContext);
            } else {
                resultCursor = (ResultCursor) iSortingCursor;
            }
        }
        generateResultIdAndPutIntoResultSetMap(resultCursor);
        return resultCursor;
    }

    private void generateResultIdAndPutIntoResultSetMap(ResultCursor resultCursor) {
        int id = this.atomicNumberCreator.getIntegerNextNumber();
        resultCursor.setResultId(id);
    }

}
